#include "config.h"

#include <fcntl.h>
#include <sys/types.h>
#include <unistd.h>

#define DBG_SUBSYS S_LIBSTORAGE

#include "dbg.h"
#include "volume_proto.h"

#include "lsv_lib.h"
#include "lsv_wbuffer_internal.h"
#include "lsv_bitmap_snap.h"
#include "lsv_wbuffer.h"

/**
 * @brief 区分.
 *
 * 分两个阶段：
 * - 补全 [可选]
 * - 全写 [fill only]
 *
 * @details 并发: IO间和IO内并发
 *
 * 不同task，不共享buffer，但可能出现相同的LBA，所以相同LBA是串行的，满足FIFO语义
 * IO间并发，需要处理好相同LBA的问题
 *
 * buffer pop是有序的，不能乱序，不能并发
 * 因为IO内的各页是串行化的，所以也不会有什么问题？
 * IO内并发，需要处理好buffer pop的问题
 *
 * 每个IO可以看做事务，如果有多页的话，需要满足事务的串行化条件.
 * 不同LBA上的锁机制：2PL
 *
 * @param arg
 * @return
 */
int lsv_page_write_with_align(void *arg) {
        int rc;
        buffer_t page_buf;
        buffer_t *use_buf;
        wbuf_io_frag_t *io_frag = arg;
        lsv_volume_proto_t *lsv_info = io_frag->lsv_info;

        uint64_t offset = io_frag->off;
        uint32_t size = io_frag->size;

        uint32_t page_before = offset % LSV_PAGE_SIZE;
        uint32_t page_after = LSV_PAGE_SIZE - page_before - size;
        uint64_t lba = offset - page_before;

        DINFO("vol %llu lsn %llu off %llu lba %llu <%u, %u, %u>\n",
               (LLU)lsv_info->ino,
               (LLU)io_frag->io->lsn,
               (LLU)offset,
               (LLU)lba,
               page_before,
               size,
               page_after);

        YASSERT(size <= LSV_PAGE_SIZE);
        YASSERT(page_before + size + page_after == LSV_PAGE_SIZE);

        int is_page_aligned = (0 == page_before && 0 == page_after);
        if (is_page_aligned) {
                use_buf = io_frag->buf;
                YASSERT(use_buf->len >= LSV_PAGE_SIZE);
        } else {
                // read-change-write

                // @malloc ERR1
                mbuffer_init(&page_buf, 0);

                if (page_before > 0) {
                        rc = lsv_page_read_nolock(io_frag->lsv_info, lba, page_before, NULL, &page_buf);
                        if (unlikely(rc)) {
                                DERROR("readpage_lsv err.errno:%d\n", rc);
                                GOTO(ERR1, rc);
                        }

                        YASSERT(page_buf.len == page_before);
                }

                rc = mbuffer_pop(io_frag->buf, &page_buf, size);
                if (unlikely(rc)) {
                        DERROR("mbuffer_pop err.errno:%d\n", rc);
                        GOTO(ERR1, rc);
                }

                if (page_after > 0) {
                        rc = lsv_page_read_nolock(io_frag->lsv_info, offset + size, page_after, NULL, &page_buf);
                        if (unlikely(rc)) {
                                DERROR("readpage_lsv err.errno:%d\n", rc);
                                GOTO(ERR1, rc);
                        }
                }

                use_buf = &page_buf;
                YASSERT(use_buf->len == LSV_PAGE_SIZE);
        }

        // 全写
        // 如果是fill的情况，有所不同，因为已经分配过写入位置信息
        if (io_frag->is_fill) {
                // 补全情况下，不能从io_frag_ctx->pop_buf取数据
                rc = lsv_wbuffer_chunk_append_page_just_do_it(io_frag, use_buf);
        } else {
                lsv_io_opt_t opt;
                lsv_io_opt_init(&opt, io_frag->io->lsn, lba, LSV_PAGE_SIZE);

                rc = lsv_wbuffer_append_page(io_frag->lsv_info, &opt, use_buf);
        }

ERR1:
        if (!is_page_aligned) {
                mbuffer_free(&page_buf);
        }

        if (io_frag->wakeup) {
                schedule_resume(&io_frag->task, rc, NULL);
        }

        return rc;
}

/**
 * IO内串行，因为有yield，每个IO内的各页是顺序处理的
 * IO间并发，同一LBA的操作串行化（读，写），对512,4K处理逻辑一样
 *
 * @yield 本过程有中断, 问题不在并发，而在并发下的冲突操作，即同一LBA上操作的可串行化
 * @param io_frag
 * @return
 */
int lsv_page_write(wbuf_io_frag_t *io_frag) {
        int rc = 0;

        static int __enter_count = 0;
        __enter_count++;
        DINFO("__enter_count %u\n", __enter_count);

        // 会导致更新wbuf的过程，不遵循WAL序
        // 对同一个LBA，覆盖更新是保序的，满足偏序关系
        uint64_t offset = io_frag->off;
        uint32_t size = io_frag->size;

        {
                // io_frag->task = schedule_task_get();

                // lba_kv_insert用到了.lba，且假定页对齐
                io_frag->lba = OFF2LBA(offset);

#if WBUF_USE_LTABLE
                io_frag->wakeup = FALSE;

                // ltable_wrlock(lsv_info->lock_table, lba);
                rc = lsv_page_write_with_align(io_frag);
                // ltable_unlock(lsv_info->lock_table, lba);
                if (rc < 0) {
                        GOTO(ERR1, rc);
                }

                if (io_frag) {
                        yfree((void **)&io_frag);
                }
#else
                lba_kv_insert(lsv_info->io_hash, lsv_page_write_with_align, io_frag, 1);
                rc = schedule_yield("LBA", NULL, NULL);
                if (rc < 0) {
                        GOTO(ERR1, rc);
                }
#endif
        }

        // io_frag_ctx已释放
        // YASSERT(io_frag->size == size);
        __enter_count--;
        return size;
ERR1:
        __enter_count--;
        return rc;
}

int lsv_wbuffer_chunk_malloc(lsv_volume_proto_t *lsv_info, lsv_wbuf_chunk_t **chunk) {
        int ret = 0;
        lsv_chunk_buf_t *chunk_buf;
        lsv_wbuf_chunk_t *_chunk;

        *chunk = NULL;

        uint32_t id = (++lsv_info->share.wbuf_chunk_idx);

        // @malloc ERR1
        ret = ymalloc((void **)&_chunk, sizeof(lsv_wbuf_chunk_t));
        if (unlikely(ret)) {
                YASSERT(0);
        }
        memset(_chunk, 0, sizeof(lsv_wbuf_chunk_t));

        // @malloc ERR2
        ret = ymalloc((void *)&chunk_buf, sizeof(lsv_chunk_buf_t));
        if (unlikely(ret)) {
                YASSERT(0);
        }
        memset(chunk_buf, 0, sizeof(lsv_chunk_buf_t));
        chunk_buf->page_idx = 0;
        chunk_buf->fill_count = 0;
        chunk_buf->id = id;

        INIT_LIST_HEAD(&_chunk->hook);
        _chunk->chunk_buf = chunk_buf;
        _chunk->id = id;

        for (int i=0; i < LSV_WBUF_PAGE_NUM; i++) {
                chunk_page_t *p = &_chunk->pages[i];
                INIT_LIST_HEAD(&p->hook);
                p->lba = 0;
                p->chunk = _chunk;
                p->page_idx = 0;
        }

        *chunk = _chunk;

        return 0;
}

int lsv_wbuffer_chunk_free(lsv_volume_proto_t *lsv_info){
        (void) lsv_info;
        return 0;
}

int lsv_wbuffer_chunk_init(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_chunk_t *chunk;

        count_list_init(&wbuf->free_list);
        count_list_init(&wbuf->wait_list);

        wbuf->last_chunk = NULL;
        wbuf->last_page_idx = -1;

        // 初始化固定长度的wbuf
        for (int i=0; i < WBUF_CHUNK_NUM; i++) {
                lsv_wbuffer_chunk_malloc(lsv_info, &chunk);
                count_list_add_tail(&chunk->hook, &wbuf->free_list);
        }

        return 0;
}

int lsv_wbuffer_chunk_destroy(lsv_volume_proto_t *lsv_info) {
        (void) lsv_info;
        return 0;
}

/**
 * @callby full | flush (push_log)
 *
 * @param lsv_info
 * @param chunk
 * @return
 */
static inline int __move_free_list_head(lsv_volume_proto_t *lsv_info, lsv_wbuf_chunk_t *chunk) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        DINFO_NOP("move free_list head, chunk %p %u page_idx %u\n", chunk, chunk->id, chunk->chunk_buf->page_idx);
        YASSERT(chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_CLEAN);
        YASSERT(chunk->chunk_buf->page_idx > 0);

        count_list_del_init(&chunk->hook, &wbuf->free_list);
        count_list_add_tail(&chunk->hook, &wbuf->wait_list);

        chunk->chunk_buf->in_use = LSV_WBUF_CHUNK_PRE_COMMIT;

        return 0;
}

/**
 * @callby gc
 *
 * @param lsv_info
 * @param chunk
 * @return
 */
static inline int __move_wait_list_head(lsv_volume_proto_t *lsv_info, lsv_wbuf_chunk_t *chunk) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        DINFO_NOP("move wait_list head, chunk %d %p page_idx %u\n", chunk->id, chunk, chunk->chunk_buf->page_idx);
        YASSERT(chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_POST_COMMIT);

        count_list_del_init(&chunk->hook, &wbuf->wait_list);
        count_list_add_tail(&chunk->hook, &wbuf->free_list);

        // chunk->chunk_buf->in_use = LSV_WBUF_CHUNK_POST_COMMIT;

        return 0;
}

/**
 * @brief free list head is reused
 * @todo switch to next, 延迟回收，直到准备重用的时候
 *
 * @param lsv_info
 * @return
 */
static inline int __reuse_free_list_head(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_chunk_t *chunk;
        struct list_head *pos;

        if (!list_empty_careful(&wbuf->free_list.list)) {
                pos = wbuf->free_list.list.next;
                chunk = list_entry(pos, lsv_wbuf_chunk_t, hook);
                if (chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_POST_COMMIT) {
                        DINFO_NOP("reuse free_list head, chunk %u %p page_idx %d\n",
                              chunk->id,
                              chunk,
                              chunk->chunk_buf->page_idx);
                        lsv_wb_hash_index_gc_v2(chunk);
                        YASSERT(chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_CLEAN);
                        YASSERT(chunk->chunk_buf->page_idx == 0);
                }
        }

        return 0;
}

inline lsv_wbuf_chunk_t *get_current_chunk(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_chunk_t *chunk;

        if (list_empty_careful(&wbuf->free_list.list)) {
                chunk = NULL;
        } else {
                chunk = list_entry(wbuf->free_list.list.next, lsv_wbuf_chunk_t, hook);
        }
        return chunk;
}

inline int lsv_wbuffer_chunk_get_free_page_number(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        struct list_head *pos, *n;
        lsv_wbuf_chunk_t *chunk;

        int page_count = 0;
        list_for_each_safe(pos, n, &wbuf->free_list.list) {
                chunk = list_entry(pos, lsv_wbuf_chunk_t, hook);
                if (chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_CLEAN) {
                        page_count += LSV_WBUF_PAGE_NUM - 1 - chunk->chunk_buf->page_idx;
                } else if (chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_POST_COMMIT){
                        page_count += LSV_WBUF_PAGE_NUM - 1;
                } else {
                        YASSERT(0);
                }
        }

        return page_count;
}

/**
 * 有副作用，可能会移除满的头chunk
 *
 * @param lsv_info
 * @param off
 * @param size
 * @return
 */
inline int lsv_wbuffer_chunk_check(lsv_volume_proto_t *lsv_info, uint64_t off, uint32_t size) {
        int page_num = lsv_page_num(off, size);
        int free_page_num = lsv_wbuffer_chunk_get_free_page_number(lsv_info);

        DINFO_NOP("off %llu size %u page_num %u free %u ok %u\n", (LLU)off, size,
                  page_num,
                  free_page_num,
                  (free_page_num >= page_num ? 1 : 0));

        return (free_page_num >= page_num ? 1 : 0);
}

int lsv_wbuffer_chunk_expand(lsv_volume_proto_t *lsv_info) {
        lsv_wbuffer_chunk_gc(lsv_info);
        return 0;
}

inline int lsv_wbuffer_chunk_ensure(lsv_volume_proto_t *lsv_info, uint64_t off, uint32_t size) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        while (1) {
                // 无副作用，无中断
                if (lsv_wbuffer_chunk_check(lsv_info, off, size)) {
                        break;
                }

                // lsv_wbuffer_chunk_expand(lsv_info);

                // 有改变，recheck
                // 无改变，wait，等待外部条件的变化
                co_cond_wait(&wbuf->full_cond);
        }

        lsv_wbuf_chunk_t *chunk;
        chunk = get_current_chunk(lsv_info);
        YASSERT(chunk != NULL);

        return 0;
}

static inline int __check_page_history(lsv_volume_proto_t *lsv_info, lsv_wbuf_chunk_t *chunk, int page_idx) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;

        YASSERT(chunk != NULL);
        YASSERT(page_idx < LSV_WBUF_PAGE_NUM - 1);

        if (wbuf->last_chunk != NULL) {
                if (wbuf->last_chunk != chunk) {
                        // [1, 32]
                        YASSERT(chunk->id == wbuf->last_chunk->id + 1 || chunk->id == 1);
                        YASSERT(page_idx == 0);
                } else {
                        YASSERT(page_idx == wbuf->last_page_idx + 1);
                }
        }

        wbuf->last_chunk = chunk;
        wbuf->last_page_idx = page_idx;

        return 0;
}

/**
 * @todo 把此过程严格地划分为两个阶段： prepare and append
 *
 * @param lsv_info
 * @param io_opt
 * @param page_buf
 * @return
 */
int lsv_wbuffer_chunk_append_page(lsv_volume_proto_t *lsv_info, lsv_io_opt_t *io_opt, buffer_t *page_buf) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_chunk_t *chunk;
        lsv_chunk_buf_t *chunk_buf;
        lsv_log_meta_t *hlog_arg;
        int page_idx;
        uint32_t snap_id;

        // check
        lsv_wbuffer_chunk_ensure(lsv_info, io_opt->lba, io_opt->size);

        // prepare 1
        {
                chunk = get_current_chunk(lsv_info);
                YASSERT(chunk != NULL);

                __reuse_free_list_head(lsv_info);

                chunk_buf = chunk->chunk_buf;
                page_idx = chunk_buf->page_idx;

                __check_page_history(lsv_info, chunk, page_idx);

                // TODO LSN乱序，不满足单调递增
                // LSN是WAL序，内存部分并非一定要遵循
                if (chunk_buf->chunk_lsn < io_opt->lsn) {
                        chunk_buf->chunk_lsn = io_opt->lsn;
                }

                chunk_buf->chunk_psn = (++wbuf->last_psn);

                snap_id = lsv_bitmap_snap_get_max_id(lsv_info);
        }

        // just do it
        {
                // @pre 分页都是4K对齐，全写，512的4K补全已经在外层完成

                uint32_t buflen = page_buf->len;

                // update data
                void *base = (void *) (&chunk_buf->page_buf) + page_idx * LSV_PAGE_SIZE;
                mbuffer_popmsg(page_buf, base, io_opt->size);

                YASSERT(buflen - page_buf->len == io_opt->size);

                uint32_t crc = page_crc32(base);

                DINFO("pop buf %p %u %u %u lba %llu crc %u to %p %u\n", page_buf,
                       buflen,
                       page_buf->len,
                       io_opt->size,
                       (LLU)io_opt->lba,
                       crc,
                       chunk_buf,
                       page_idx);

                // update meta
                hlog_arg = chunk_buf->hlog_arg;
                hlog_arg[page_idx + 1].hlog.lba = io_opt->lba;
                hlog_arg[page_idx + 1].hlog.snap_id = snap_id;
                // hlog_arg[page_idx + 1].hlog.size = io_opt->size;
                hlog_arg[page_idx + 1].hlog.crc = crc;

                // update index
                lsv_wb_hash_index_insert_v2(lsv_info, io_opt->lba, chunk, page_idx);

                chunk_buf->fill_count++;
        }

        // prepare 2
        {
                DINFO("vol %llu psn %llu %llu lsn %llu chk_lsn %llu lba %llu size %d chunk %u %p pg_idx %d snap_id %u\n",
                       (LLU)lsv_info->ino,
                       (LLU)wbuf->last_psn,
                       (LLU)wbuf->commit_psn,
                       (LLU)io_opt->lsn,
                       (LLU)chunk_buf->chunk_lsn,
                       (LLU)io_opt->lba,
                       io_opt->size,
                       chunk->id,
                       chunk,
                       page_idx,
                       snap_id);

                // 确保从读出至此，过程没有中断，page_idx没有变化
                YASSERT(page_idx == chunk_buf->page_idx);

                // chunk水位线，指向下一个可用页，也表示页数，0表示空，255表示满
                chunk_buf->page_idx++;

                if (chunk_is_full(chunk_buf)) {
                        lsv_wbuffer_chunk_push_log(lsv_info, chunk, 0, 0);
                }
        }

        return 0;
}

/**
 * @brief full和flush的不同，在page_idx的不同
 *
 * @param lsv_info
 * @param chunk
 * @param flush
 * @return
 */
int lsv_wbuffer_chunk_push_log(lsv_volume_proto_t *lsv_info, lsv_wbuf_chunk_t *chunk, int flush, int is_prepare) {
        lsv_chunk_buf_t *chunk_buf;

        (void) is_prepare;
        if (flush) {
                // 只需要处理free list head
                YASSERT(chunk == NULL);
                chunk = get_current_chunk(lsv_info);
                if (chunk == NULL || chunk_is_empty(chunk->chunk_buf)) {
                        return 0;
                }

                chunk_buf = chunk->chunk_buf;
                YASSERT(chunk_buf->page_idx > 0);
        } else {
                YASSERT(chunk != NULL);
                chunk_buf = chunk->chunk_buf;
                YASSERT(chunk_buf->page_idx == LSV_WBUF_PAGE_NUM - 1);
        }

        DINFO_NOP("ino %llu chunk %u %p page %d flush %d\n",
               (LLU)lsv_info->ino,
               chunk->id,
               chunk,
               chunk_buf->page_idx,
               flush);

        if (chunk_buf->page_idx > 0) {
                // TODO order
                chunk_buf->hlog_arg[0].head.timestamp = gettime();
                chunk_buf->hlog_arg[0].head.page_count = chunk_buf->page_idx;
                chunk_buf->hlog_arg[0].head.crc = 0;

                // 区分两个阶段：准备和填充
                __move_free_list_head(lsv_info, chunk);

                // 填充完成后才能进入pipeline
                // 通过一个填充计数器来跟踪填充完成的情况
                if (flush) {
                        lsv_wbuffer2log(lsv_info, chunk_buf);
                } else {
                        // prepare阶段就要进入pipeline，否则无法保证提交序
                        // 修改pipeline处理逻辑：等待填充完成才能进入log阶段
                        lsv_wbuffer2log(lsv_info, chunk_buf);
                }
        }

        return 0;
}

int lsv_wbuffer_chunk_commit(lsv_volume_proto_t *lsv_info, lsv_chunk_buf_t *chunk_buf, int in_use) {
        // TODO error handling
        YASSERT(in_use == LSV_WBUF_CHUNK_POST_COMMIT);

        chunk_buf->in_use = in_use;

        lsv_wbuffer_chunk_gc(lsv_info);

        return 0;
}

int lsv_wbuffer_chunk_gc(lsv_volume_proto_t *lsv_info) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        struct list_head *pos, *n;
        lsv_wbuf_chunk_t *chunk;

        list_for_each_safe(pos, n, &wbuf->wait_list.list) {
                chunk = list_entry(pos, lsv_wbuf_chunk_t, hook);
                if (chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_POST_COMMIT) {
                        __move_wait_list_head(lsv_info, chunk);
                } else if (chunk->chunk_buf->in_use == LSV_WBUF_CHUNK_PRE_COMMIT) {
                        break;
                } else {
                        YASSERT(0);
                }
        }

        return 0;
}

static int __lsv_wbuffer_chunk_append_page_prepare(lsv_volume_proto_t *lsv_info, uint64_t io_lsn,
                                                   uint64_t off, uint32_t size, wbuf_io_frag_t *io_frag) {
        lsv_wbuf_t *wbuf = lsv_info->wbuf;
        lsv_wbuf_chunk_t *chunk;
        lsv_chunk_buf_t *chunk_buf;
        int page_idx;
        uint32_t snap_id;

        YASSERT(size > 0 && size <= LSV_PAGE_SIZE);

        int wal_is_ok = lsv_wbuffer_chunk_check(lsv_info, off, size);
        YASSERT(wal_is_ok == 1);

        // prepare 1
        {
                chunk = get_current_chunk(lsv_info);
                YASSERT(chunk != NULL);

                __reuse_free_list_head(lsv_info);

                chunk_buf = chunk->chunk_buf;
                page_idx = chunk_buf->page_idx;

                __check_page_history(lsv_info, chunk, page_idx);

                YASSERT(chunk_buf->chunk_lsn <= io_lsn);
                chunk_buf->chunk_lsn = io_lsn;

                chunk_buf->chunk_psn = (++wbuf->last_psn);

                snap_id = lsv_bitmap_snap_get_max_id(lsv_info);
        }

        io_frag->lba = OFF2LBA(off);
        io_frag->off = off;
        io_frag->size = size;

        io_frag->chunk = chunk;
        io_frag->page_idx = page_idx;
        io_frag->page_num = 1;
        io_frag->snap_id = snap_id;

        // 不更新fill_count
        // 必须在本阶段建立LBA的hash，确保提交序
        // fill阶段，IO内页序，与mbuffer pop有关
        lba_fifo_insert_page(lsv_info->lba_fifo, io_frag);

        // prepare 2
        {
                DINFO("vol %llu psn %llu %llu lsn %llu chk_lsn %llu lba %llu <%llu, %d> chunk %p %u page %d snap_id %u\n",
                       (LLU)lsv_info->ino,
                       (LLU)wbuf->last_psn,
                       (LLU)wbuf->commit_psn,
                       (LLU)io_lsn,
                       (LLU)chunk_buf->chunk_lsn,
                       (LLU)OFF2LBA(off),
                       (LLU)off,
                       size,
                       chunk,
                       chunk->id,
                       page_idx,
                       snap_id);

                YASSERT(page_idx == chunk_buf->page_idx);
                chunk_buf->page_idx++;

                if (chunk_is_full(chunk_buf)) {
                        lsv_wbuffer_chunk_push_log(lsv_info, chunk, 0, 1);
                }
        }

        return 0;
}

/**
 * @todo 捕获不变式
 * @todo 严格检查每个过程的pre和post，并明确声明
 *
 * @param lsv_info
 * @param io
 * @param head
 * @return
 */
int lsv_wbuffer_chunk_prepare(lsv_volume_proto_t *lsv_info, const io_t *io, buffer_t *buf, wbuf_io_head_t **head) {
        int ret;
        uint64_t off;
        uint32_t left, step;
        wbuf_io_head_t *_head;
        wbuf_io_frag_t *io_frag;

#if DEBUG_MODE
        CHECK_CONCURRENCY_BEGIN();
#endif

        *head = NULL;

        int wal_is_ok = lsv_wbuffer_chunk_check(lsv_info, io->offset, io->size);
        YASSERT(wal_is_ok == 1);

        int page_num = lsv_page_num(io->offset, io->size);
        int page_count = 0;

        // @malloc ERR1
        ret = ymalloc((void **)&_head, sizeof(wbuf_io_head_t) + sizeof(wbuf_io_frag_t) * page_num);
        if (unlikely(ret)) {
                YASSERT(0);
        }

        _head->lsv_info = lsv_info;
        _head->io = io;
        _head->buf = buf;

        _head->frag_num = page_num;

        off = io->offset;
        left = io->size;

        while (left > 0) {
                step = LSV_PAGE_SIZE - off % LSV_PAGE_SIZE;
                step = _min(step, left);

                io_frag = &_head->frags[page_count];

                __lsv_wbuffer_chunk_append_page_prepare(lsv_info, io->lsn, off, step, io_frag);

                io_frag->lsv_info = lsv_info;
                io_frag->io = io;
                io_frag->buf = buf;
                io_frag->is_fill = 1;

                io_frag->status = WBUF_IO_FRAG_PREPARE;
                io_frag->__enter_count_fill = 0;
                io_frag->__enter_count_write = 0;
                co_cond_init(&io_frag->cond);

                off += step;
                left -= step;
                page_count++;
        }

        YASSERT(left == 0);
        YASSERT(page_count == page_num);

        *head = _head;

#if DEBUG_MODE
        CHECK_CONCURRENCY_END();
#endif

        return 0;
}

/**
 * @note IO内串行，是否有必要并发(fork and join)？
 *
 * @todo 并发执行该函数，会造成诸多问题
 *
 * @param lsv_info
 * @param head
 * @return
 */
int lsv_wbuffer_chunk_fill(lsv_volume_proto_t *lsv_info, wbuf_io_head_t *head) {
        int ret;
        wbuf_io_frag_t *io_frag;

        // 目前，IO内是串行处理的
        // 并发处理会造成任务数过多

        for (int i=0; i < head->frag_num; i++) {
                io_frag = &head->frags[i];

                uint64_t lba = OFF2LBA(io_frag->off);
                ltable_wrlock(&lsv_info->lock_table, lba);
        }

        for (int i=0; i < head->frag_num; i++) {
                io_frag = &head->frags[i];

                // 数据已关联wbuf chunk
                // 数据还没有页对齐

                DINFO("pop io %p %llu buf %p %u lba %llu %llu %u chunk %p %u page %u snap %u\n",
                       head->io,
                       (LLU)head->io->lsn,
                       head->buf,
                       head->buf->len,
                       (LLU)OFF2LBA(io_frag->off),
                       (LLU)io_frag->off,
                       io_frag->size,
                       io_frag->chunk,
                       io_frag->chunk->id,
                       io_frag->page_idx,
                       io_frag->snap_id);

                ret = lsv_page_write(io_frag);
                if (ret < 0) {
                        // TODO error handling
                        YASSERT(0);
                }

#if 0
                // TODO 当填充完成时，提交到pipeline
                // 已经在prepare阶段，进入wait列表，其in_use也已经切换到LSV_WBUF_CHUNK_PRE_COMMIT
                if (chunk_is_full(chunk_buf)) {
                        if (chunk_buf->fill_count == chunk_buf->page_idx) {
                                lsv_wbuffer2log(lsv_info, chunk_buf);
                        }
                }
#endif
        }

        for (int i=0; i < head->frag_num; i++) {
                io_frag = &head->frags[i];

                uint64_t lba = OFF2LBA(io_frag->off);
                ltable_unlock(&lsv_info->lock_table, lba);
        }

#if 0
        ret = schedule_yield(__FUNCTION__, NULL, NULL);
        if (unlikely(ret)) {
                YASSERT(0);
        }
#endif

        YASSERT(head->buf->len == 0);
        return 0;
}

/**
 * @note 此处是最开始写入数据的地方，分析过程和数据验证，可以从此开始，对比lba和crc。
 *
 * 此时，还没有bind lich chunk_id，直到log落盘时才bind。更新bitmap时，也有相应的crc校验。
 *
 * @param io_frag
 * @param buf
 * @return
 */
int lsv_wbuffer_chunk_append_page_just_do_it(wbuf_io_frag_t *io_frag, buffer_t *buf) {
        YASSERT(io_frag->lba % LSV_PAGE_SIZE == 0);
        YASSERT(io_frag->size <= LSV_PAGE_SIZE);
        YASSERT(buf != NULL);
        YASSERT(buf->len >= LSV_PAGE_SIZE);

        YASSERT(io_frag != NULL);
        YASSERT(io_frag->page_idx >= 0 && io_frag->page_idx < LSV_WBUF_PAGE_NUM - 1);

        lsv_wbuf_chunk_t *chunk = io_frag->chunk;
        lsv_chunk_buf_t *chunk_buf = chunk->chunk_buf;
        YASSERT(chunk->chunk_buf != NULL);

        lsv_volume_proto_t *lsv_info = io_frag->lsv_info;

        uint32_t buflen = buf->len;

        // update data
        void *base = (void *) (&chunk_buf->page_buf) + io_frag->page_idx * LSV_PAGE_SIZE;
        mbuffer_popmsg(buf, base, LSV_PAGE_SIZE);


        YASSERT(buflen - buf->len == LSV_PAGE_SIZE);
#ifdef __PAGE_CRC32__
        uint32_t crc = page_crc32(base);
#else
	uint32_t crc = 0;
#endif
        DINFO("refmap write lba %llu crc %u <%llu,%u> to chunk %p %d page %d buf %u %u\n",
              (LLU)OFF2LBA(io_frag->off), crc,
              (LLU)io_frag->off, io_frag->size,
              chunk, chunk->id, io_frag->page_idx,
              buflen, buf->len);

        YASSERT(chunk->chunk_buf != NULL);

        // update meta
        lsv_log_meta_t *hlog_arg;
        hlog_arg = chunk_buf->hlog_arg;
        hlog_arg[io_frag->page_idx + 1].hlog.lba = io_frag->lba;
        hlog_arg[io_frag->page_idx + 1].hlog.snap_id = io_frag->snap_id;
        // hlog_arg[io_frag->page_idx + 1].hlog.size = LSV_PAGE_SIZE;
        hlog_arg[io_frag->page_idx + 1].hlog.crc = crc;

        // update index
        lsv_wb_hash_index_insert_v2(lsv_info, io_frag->lba, io_frag->chunk, io_frag->page_idx);

        // TODO 更新fill_count, pipeline会根据该值进行处理
        chunk_buf->fill_count++;
        if (chunk_is_fill(chunk_buf)) {
                // 需要区分full和flush两种模式
                if (chunk_is_full(chunk_buf)) {
                        lsv_wbuf_t *wbuf = lsv_info->wbuf;
                        pipeline_do_log(&wbuf->pipeline);

                        YASSERT(chunk->id == chunk->chunk_buf->id);
                        DINFO("fill chunk_idx %d page_idx %d\n", chunk->id, chunk->chunk_buf->page_idx);
                }
        }

        return 0;
}

int lsv_wbuffer_chunk_fill_v2(lsv_volume_proto_t *lsv_info, wbuf_io_head_t *head) {
        int ret;
        wbuf_io_frag_t *io_frag;

        // 也可以采用fork and join并行模式, 但不能用mbuffer结构
        // mbuffer的pop，内在是有序的，不是基于offset进行访问
        for (int i=0; i < head->frag_num; i++) {
                io_frag = &head->frags[i];
                DINFO_NOP("%u/%u lsn %llu lba %llu io_frag %p\n",
                      i,
                      head->frag_num,
                      (LLU)io_frag->io->lsn,
                      (LLU)io_frag->lba,
                      io_frag);

                ret = lba_fifo_fill_page(lsv_info->lba_fifo, lsv_page_write_with_align, io_frag);
                if (unlikely(ret)) {
                        YASSERT(0);
                }
        }

        // 如果没有pop而到此，有问题
        // IO内需要保留页序，每页都是同步过程
        YASSERT(head->buf->len == 0);
        return 0;
}
